package com.xdl.apitest.tabletest.udftest

import com.xdl.apitest.SensorReading
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.table.api.{Table, Tumble}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.api.scala.StreamTableEnvironment
import org.apache.flink.table.functions.{ScalarFunction, TableFunction}
import org.apache.flink.types.Row

/**
 * Project: FlinkTutorial
 * Package: com.xdl.apitest
 * Version: 1.0
 *
 * Created by guoxiaolong on 2020-08-01-22:21
 */
object TableFunctionTest {
  def main(args: Array[String]): Unit = {
    //0.创建流执行环境，读取数据并转换成样例类
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    // 1.创建表执行环境
    val tableEnv = StreamTableEnvironment.create(env)

    // 2. 读取数据转换成流， map 成样例类
    val filePath: String = "D:\\Projects\\BigData\\FlinkTutorial\\src\\main\\resources\\sensor.txt"
    val inputStream: DataStream[String] = env.readTextFile(filePath)

    //map 成样例类型
    val dataStream: DataStream[SensorReading] = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0), dataArray(1).toLong, dataArray(2).toDouble)
      })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(1)) {
        override def extractTimestamp(element: SensorReading): Long = element.timestamp * 1000L
      })

    //将流转换成表，直接定义时间字段
    val sensorTable: Table = tableEnv.fromDataStream(dataStream, 'id, 'temperature, 'timestamp, 'pt.proctime)

    //先创建一个 UDF对象
    val aplit = new Split("_")

    // Table API 调用
    val resultTable = sensorTable
      .joinLateral(aplit('id) as('word, 'length)) //侧向链接，应用 TableFunction
      .select('id, 'ts, 'word, 'length)

    // SQL 调用
    tableEnv.createTemporaryView("sensor", sensorTable)
    tableEnv.registerFunction("split", aplit)
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select id, ts, word ,length
        |from
        |sensor lateral table(split(id)) as splitid(word, lenth)
        |""".stripMargin

    )


    resultTable.toAppendStream[Row].print("result")
    resultSqlTable.toAppendStream[Row].print("sql")

    env.execute("table function test job")

  }

  //自定义 TableFunction，实现分割字符串并统计长度(word, Length)
  class Split(separator: String) extends TableFunction[(String, Int)] {
    def eval(str: String): Unit = {
      str.split(separator).foreach(
        word => collect((word, word.length))
      )
    }
  }

}
